The dot-pipe equivalence
委譲についての一つの考え方として、「ドット・パイプの等価性」というものがあります。.シンタックスを用いたクエリ演算子の流暢な合成は、計算の実行を分解して分散させる機会と捉えることができます。ストリーム処理プラットフォームでこれを行うためには、上流の計算機から下流の残余の計算機にイベントを輸送するためのデータフローチャネルを挿入する必要があります。
One way of thinking about delegation is the “dot-pipe equivalence”. The syntactic fluent composition of query operators using . syntax can be seen as an opportunity to decompose and distribute execution of the computation. In order to do so in a stream processing platform, a data flow channels needs to be inserted to transport the events from the upstream computation to the downstream residual computation.
例として、次のようなアカデミックなクエリを考えてみましょう。
As an example, consider the following academic query:
code:query
xs.Where(x => x > 0).Select(x => x + 1).Subscribe(o)
多くの場合、ストリーム処理プラットフォームには、ストリームxsからイベントを受信し、オブザーバーoにイベントを送信する責任を負う「入口」と「出口」のシステムがあります。基本的に、これにより、ストリーム処理プラットフォームと外部の世界の端に、すでに2つの暗黙のパイプが確立されます。
Often, a stream processing platform has an “ingress” and “egress” system responsible for receiving events from the stream xs and to send events to the observer o. Essentially, this establishes two implicit pipes already, on the edge of the stream processing platform and the outside world.
しかし、クエリ式をサブクエリ式に書き換えることで、実行自体を分散させることも可能です。上の例では、最大分解(外界へのエッジを無視して、つまりパブリッシャーをxsに、レシーバーをoに)すると以下のようになります。
However, the execution itself can also be distributed by rewriting the query expression into subquery expressions. In the example above, the maximum decomposition (ignoring edges to the outside world, i.e. publishers into xs and receivers on o) would look as follows:
code:C#
var d1 = t1.Subscribe(o);
var d2 = t2.Select(x => x + 1).Subscribe(t1);
var d3 = xs.Where(x => x > 0).Subscribe(t2);
上に示した表現は、プレーンなRxシンタックスであり、明示的な識別子を持つ非同期サブスクリプションのIRPに相当するものが明らかになるはずです。
The representation shown above is plain Rx syntax; the IRP equivalent of asynchronous subscriptions with explicit identifiers should be apparent:
code:C#
var d1 = await t1.SubscribeAsync("d1", o);
var d2 = await t2.Select(x => x + 1).SubscribeAsync("d2", t1);
var d3 = await xs.Where(x => x > 0).SubscribeAsync("d3", t2);
この実行計画では、t1とt2は、元のクエリ式のドットに相当するパイプである中間ストリームです。
In this execution plan, t1 and t2 are intermediate streams which are the pipe equivalent to the dots in the original query expression:
code:query
xs.Where(x => x > 0) ()==t2==() Select (x => x + 1) ()==t1==() Subscribe(o)
これらのパイプは、計算ノード間の1対1の通信に特化したIRPサービス内の内部専用ストリームとして作成することができます。これらのパイプの寿命は、Finally(パイプを削除するための最終的な副作用を追加する)やDefer(パイプを作成するための初期的な副作用を追加する)などの演算子を使用して、クエリ式をさらに書き換えることで管理できます。
These can be created as internal-only streams within the IRP service, specialized towards a 1:1 communication across computation nodes. Lifetime of these pipes can be managed by further rewriting the query expression using operators such as Finally (to add terminal side-effects to delete pipes) and Defer (to add initial side-effects to create pipes).
これは、例えば、Map/Reduceフレームワークにおいて、全体的なクエリインテントのサブエクスプレッションを評価した結果を保持するために、中間コレクションが割り当てられるような、静止状態のデータに対する分散クエリ実行計画と非常によく似ていることに注意してください。このような中間スクラッチパッドの割り当て/割り当て解除には、セントラルコーディネーター、ピアツーピアアプローチ(上流が割り当て、下流が解除)、あるいはライフタイムベースのGCアプローチ(継続的なストリーミングシステムにはあまり適用できませんが、リースベースの技術を使用することができます)など、さまざまなアプローチを使用できます。
Note this is very similar to distributed query execution planning for data at rest, e.g. in a map/reduce framework where intermediate collections are allocated to hold the results of evaluating subexpressions of the overall query intent. Various approaches can be used to allocate/deallocate such intermediate scratch pads: a central coordinator, a peer-to-peer approach (upstream allocates, downstream deallocates), or even a lifetime-based GC approach (less applicable to continuous streaming systems, though lease-based techniques could be used).
委譲の文脈では、部分式は他の IRP システムに委譲されるため、これらのシステム間でデータ・フロー・チャネルを作成する必要があり、その寿命はクエリ全体の寿命によって制御されます。IRPの抽象化を使用することで、これを行うために必要なツールが提供されます。すなわち、IRPサービスにおける能力の発見、委譲される部分式のためのアーティファクトを定義する能力、そして、クエリを構成し、ホットランニングコンピュテーションを作成(開始)および削除(停止)するDDLオペレーションを実行するために、IRPサービスで定義されたアーティファクトへのプロキシを取得する能力があります。
In the context of delegation, subexpressions get delegated to other IRP systems, thus requiring a data flow channel to be created between those systems, whose lifetime is controlled by the lifetime of the overall query. The use of IRP abstractions provides the necessary tools to be able to do this, namely discovery of capabilities in IRP services, the ability to define artifacts for subexpressions being delegates, and the ability to obtain proxies to artifacts defined in IRP services in order to compose queries and perform DDL operations to create (start) and delete (stop) hot running computations.
IRPシステム間の境界を横断する使用可能なパイプの作成を可能にするために、これらのシステムは、例えば、HTTPベースのパイプ、ウェブフック、中間の信頼性のある永続的なキューなどに支えられた、他のIRPサービスにイベントを受信および送信するための1:1パイプを可能にする観察可能なプロキシおよびオブザーバープロキシの利用可能性に同意することができます。この場合も、メタデータ機能を使用して両端の利用可能なパイプを問い合わせ、データフローを設定するための適切な手段を決定して、効果的にハンドシェイクを実現することができます。発見されたアーティファクトに関連するメタデータは、サポートされている暗号化、利用可能なQPS、信頼性保証などの情報をさらに明らかにすることができます。
In order to enable the creation of usable pipes to cross the boundary between IRP systems, such systems can agree on the availability of observable and observer proxies that enable 1:1 pipes to receive and send events to other IRP services, for example backed by HTTP-based pipes, web hooks, intermediate reliable and persisted queues, etc. Again, the metadata capability can be used to inquire about available pipes on both ends in order to determine a suitable means to set up the data flow, effectively achieving a handshake. Metadata associated with discovered artifacts can further reveal information about supported encryption, available QPS, reliability guarantees, etc.